package linc.fun.openai.stream.consumer;

import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.mybatisflex.core.query.QueryWrapper;
import com.rabbitmq.client.Channel;
import jakarta.annotation.Resource;
import linc.fun.openai.constants.ApplicationConstants;
import linc.fun.openai.constants.RedisKeyConstants;
import linc.fun.openai.domain.dto.mq.StreamMessage;
import linc.fun.openai.domain.entity.chat.ChatOrderDO;
import linc.fun.openai.domain.entity.chat.ChatOrderDelayFailureRecord;
import linc.fun.openai.domain.entity.chat.ChatOrderHistoryDO;
import linc.fun.openai.domain.entity.chat.ChatOrderItemDO;
import linc.fun.openai.enums.ChatOrderPayStatusEnum;
import linc.fun.openai.enums.ChatOrderStatusEnum;
import linc.fun.openai.exception.BizException;
import linc.fun.openai.manager.ChatProductStockManager;
import linc.fun.openai.service.ChatOrderDelayFailureRecordService;
import linc.fun.openai.service.ChatOrderHistoryService;
import linc.fun.openai.service.ChatOrderItemService;
import linc.fun.openai.service.ChatOrderService;
import linc.fun.openai.util.IdGenerator;
import linc.fun.openai.util.ObjectMapperUtil;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.support.atomic.RedisAtomicLong;
import org.springframework.messaging.Message;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static linc.fun.openai.domain.entity.chat.table.Tables.*;

/**
 * @author yqlin
 * @date 2023/5/10 14:47
 * @description
 */
@Configuration
@Slf4j
public class ChatOrderStreamConsumer implements StreamConsumer {

    @Resource
    private Redisson redisson;
    @Resource
    private ChatOrderService chatOrderService;
    @Resource
    private ChatOrderItemService chatOrderItemService;
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    @Resource
    private TransactionTemplate transactionTemplate;
    @Resource
    private ChatOrderHistoryService chatOrderHistoryService;
    @Resource
    private ChatOrderDelayFailureRecordService chatOrderDelayFailureRecordService;
    @Resource
    private IdGenerator idGenerator;
    @Resource
    private ChatProductStockManager chatProductStockManager;

    /**
     * mq接收ackMessage消息/手动ack确认
     */
    @Bean("chat-order-sink")
    public Consumer<Message<StreamMessage>> consume() {
        log.info("chat-order-sink-初始化订阅");
        return obj -> {

            log.info("chat-order-sink-消息接收成功：" + obj.getPayload());
            Channel channel = obj.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
            Assert.notNull(channel, "chat-order-sink ==> channel 不能为空");

            Long deliveryTag = obj.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
            Assert.notNull(deliveryTag, "chat-order-sink ==> delivery_tag 不能为空");

            StreamMessage streamMessage = obj.getPayload();
            String orderId = streamMessage.getMessageText();
            // 幂等性处理
            RLock rLock = redisson.getLock(RedisKeyConstants.CHAT_ORDER_DELAY_CONSUMER_KEY + orderId);
            rLock.lock(5, TimeUnit.MINUTES);
            try {
                // 查询订单
                ChatOrderDO order = this.queryOrder(orderId);
                log.info("chat-order-sink ==> 当前订单信息: {}，订单状态: {}", JSON.toJSONString(order), order.getStatus().getDesc());
                // 支付状态 0:支付中(0:待付款 1:待确认) 1:支付成功(2:已完成)  2:支付失败(3:无效订单)
                if (!(ChatOrderStatusEnum.COMPLETED.equals(order.getStatus()) ||
                        ChatOrderStatusEnum.CLOSED.equals(order.getStatus()))) {
                    log.info("chat-order-sink ==> 开始处理超时订单(待付款|待确认)，当前订单信息: {}，订单状态: {}", JSON.toJSONString(order), order.getStatus().getDesc());
                    // 查询订单项
                    List<ChatOrderItemDO> orderItems = this.queryOrderItems(order.getId());

                    Boolean ok = transactionTemplate.execute(status -> {
                        try {
//                            int num = 1 / 0;
                            // 关闭订单
                            this.closeOrder(order);
                        } catch (Exception e) {
                            status.setRollbackOnly();
                            log.error("关闭订单异常", e);
                            return false;
                        }
                        return true;
                    });

                    if (Boolean.TRUE.equals(ok)) {
                        // 归还预减库存
                        chatProductStockManager.returnProductStock(orderItems);
                        // ack确认
                        channel.basicAck(deliveryTag, false);
                        log.info("订单号: {}，手动确认当前消息已完成消费", order.getId());
                    } else {
                        throw BizException.of("关闭订单异常");
                    }
                } else {
                    log.info("订单号: {}，已完成支付，当前订单状态: {}", order.getId(), order.getStatus().getDesc());
                    // ack确认
                    channel.basicAck(deliveryTag, false);
                    log.info("订单号: {}，手动确认当前消息已完成消费", order.getId());
                }
            } catch (Exception e) {
                log.error("消费超时订单消息异常", e);
                final String retryKey = RedisKeyConstants.CHAT_ORDER_RETRY_COUNT_KEY + orderId;
                RedisAtomicLong atomicLong = new RedisAtomicLong(retryKey, Objects.requireNonNull(redisTemplate.getConnectionFactory()));
                try {
                    if (atomicLong.get() == 3) {
                        redisTemplate.delete(retryKey);
                        // 消息最多重复失败3次，之后就进行落库
                        log.info("超时订单消息，订单号: {}，消息重复失败3次，进行消息落库", orderId);
                        this.saveFailureDelayMessageRecord(streamMessage, e);
                        // ack确认
                        channel.basicAck(deliveryTag, false);
                    } else {
                        long retryCount = atomicLong.incrementAndGet();
                        log.info("超时订单消息，订单号: {}，正在重新回队列，重试第{}次", orderId, retryCount);
                        // 重新回队列-true则重新入队列，否则丢弃或者进入死信队列。
                        channel.basicReject(deliveryTag, true);
                    }
                } catch (IOException ex) {
                    log.error("消费超时订单消息，订单号: {}，重新回队列异常 ", orderId, ex);
                }
            } finally {
                rLock.unlock();
            }


        };
    }

    /**
     * 失败信息进行落库存
     */
    private void saveFailureDelayMessageRecord(StreamMessage streamMessage, Exception e) {
        ChatOrderDelayFailureRecord failureRecord = new ChatOrderDelayFailureRecord();
        failureRecord.setId(idGenerator.getId());
        failureRecord.setErrorMessage(e.getMessage());
        failureRecord.setMessagePayload(ObjectMapperUtil.toJson(streamMessage));
        failureRecord.setRetryCount(0);
        failureRecord.setCreateTime(LocalDateTime.now());
        chatOrderDelayFailureRecordService.save(failureRecord);
    }

    @NotNull
    private ChatOrderDO queryOrder(String orderId) {
        ChatOrderDO order = chatOrderService.getOne(QueryWrapper.create()
                .select(ChatOrder.Id, ChatOrder.Status)
                .where(ChatOrder.Id.eq(Long.valueOf(orderId))));
        if (Objects.isNull(order)) {
            throw BizException.INVALID_ORDER;
        }
        return order;
    }

    /**
     * 关闭订单
     */
    private void closeOrder(ChatOrderDO order) {
        log.info("订单号: {}，未支付正在被关闭，当前订单状态: {}", order.getId(), order.getStatus().getDesc());
        order.setStatus(ChatOrderStatusEnum.CLOSED);
        order.setUpdateTime(LocalDateTime.now());
        order.setRemark(ApplicationConstants.CHAT_ORDER_TIMEOUT_CLOSE_ORDER_REMARK);
        chatOrderService.updateById(order);
        log.info("订单号: {}，未支付已完成关闭，当前订单状态: {}", order.getId(), order.getStatus().getDesc());
        ChatOrderHistoryDO orderHistory = chatOrderHistoryService.getOne(QueryWrapper.create()
                .where(ChatOrderHistory.OrderId.eq(order.getId())));
        orderHistory.setOrderStatus(order.getStatus());
        orderHistory.setPayStatus(ChatOrderPayStatusEnum.PAYMENT_CLOSED);
        orderHistory.setUpdateTime(LocalDateTime.now());
        orderHistory.setRemark(ApplicationConstants.CHAT_ORDER_TIMEOUT_CLOSE_ORDER_REMARK);
        chatOrderHistoryService.updateById(orderHistory);
        log.info("更新历史订单完成");
    }

    /**
     * 查询订单项
     */
    @NotNull
    private List<ChatOrderItemDO> queryOrderItems(Long orderId) {
        QueryWrapper oiWrapper = QueryWrapper.create()
                .select(ChatOrderItem.Id,
                        ChatOrderItem.OrderId,
                        ChatOrderItem.ProductId,
                        ChatOrderItem.ProductName,
                        ChatOrderItem.ExchangeNum)
                .and(ChatOrderItem.OrderId.eq(orderId));
        List<ChatOrderItemDO> orderItems = chatOrderItemService.list(oiWrapper);
        if (CollUtil.isEmpty(orderItems)) {
            throw BizException.DATA_HAS_BEEN_LOOSED;
        }
        return orderItems;
    }


}
